use std::collections::HashMap;
use std::io::{stdin, Write};
use std::process::{Command, Stdio};
use std::{env, process, thread};

use log::{error, info};
use sysinfo::System;
use ureq::Error;

use args::CmdLineArgs;
use config::IndexerConfig;

use crate::endpoint::Message;

mod args;
mod config;
mod datasource;
mod endpoint;
mod global;
mod summary;
mod task;

/// indexer entry
fn main() {
    //read arguments from command line
    let args = global::get_global_args();

    //to kill process already started
    if args.is_stop() {
        kill_self();
        return;
    }

    //read configurations
    let config = global::get_global_config();

    //check every part of configurations (datasource, endpoint and tasks)
    if args.is_test() {
        config.test();
        return;
    }

    //execute data clean
    if args.is_clean() {
        clean(&config);
        return;
    }

    //execute initial procedure (such as create trigger, table, full data put , etc.)
    if args.is_init() {
        init(&config);
        return;
    }

    if args.is_start() {
        //start daemon
        let p_args: Vec<String> = env::args().collect();
        let program = &p_args[0];
        Command::new(program)
            .args(args.start_args())
            .stdin(Stdio::piped())
            .stdout(Stdio::piped())
            .spawn()
            .unwrap();
    } else {
        info!("started.");
        start(&config, &args);
    }
}

/// initialize database and triggers
///
/// # Arguments
/// * `config` global configurations from `indexer.yml`
///
fn init(config: &IndexerConfig) {
    let mut error = false;
    if confirm("Create tasks table and triggers (y/N)? ") {
        //get all dedup datasources used in tasks
        let mut ds_names: Vec<String> =
            config.tasks().iter().map(|t| t.1.datasource.clone()).collect();
        ds_names.dedup();
        //init the datasource
        for ds_name in ds_names {
            let datasource = config
                .datasource(&ds_name)
                .expect(&format!("task datasource [\"{}\"] undefined.", ds_name));
            //get tasks that use this datasource
            let mut tasks = HashMap::new();
            tasks.extend(config.tasks().iter().filter(|t| t.1.datasource == ds_name));
            if let Ok(ids) = datasource.inst() {
                if let Err(e) = ids.init(&tasks) {
                    error!("failed to init datasource, reason: {}", e);
                    error = true;
                }
            }
        }
        if !error {
            info!("indexer initialized.");
        }
    }
    if !error && confirm("Begin to execute full synchronization (y/N)? ") {
        match begin_sync(&config) {
            Ok(_) => info!("indexer full synchronized."),
            Err(Error::Status(code, resp)) => {
                if let Ok(msg) = resp.into_json::<Message>() {
                    error!("sync failed with code {}, reason: {:?}", msg.error, msg.message);
                } else {
                    error!("sync failed with code {}", code);
                }
            }
            Err(Error::Transport(t)) => {
                if let Some(msg) = t.message() {
                    error!("sync failed, reason: {}", msg);
                }
            }
        }
    }
}

/// clean tables, functions and triggers that indexer create.
/// when the user is no longer using the indexer, this function is to purge the data
fn clean(config: &IndexerConfig) {
    if !confirm("Are you sure clear relevant data created by the indexer? (y/N)? ") {
        return;
    }
    let mut ds_names: Vec<String> = config.tasks().iter().map(|t| t.1.datasource.clone()).collect();
    ds_names.dedup();
    //init the datasource
    for ds_name in ds_names {
        let datasource = config
            .datasource(&ds_name)
            .expect(&format!("task datasource [\"{}\"] undefined.", ds_name));
        //get tasks that use this datasource
        let mut tasks = HashMap::new();
        tasks.extend(config.tasks().iter().filter(|t| t.1.datasource == ds_name));
        if let Ok(ids) = datasource.inst() {
            if let Err(e) = ids.clean(&tasks) {
                error!("failed to clean datasource, reason: {}", e);
            } else {
                info!("datasource [\"{}\"] cleaned.", ds_name);
            }
        }
    }
}

/// synchronize all tasks
///
/// # Arguments
/// * `config` global configurations from `indexer.yml`
///
fn begin_sync(config: &IndexerConfig) -> Result<(), Error> {
    let token = config.endpoint().token()?;
    for task in config.tasks().iter() {
        let datasource = config
            .datasource(&task.1.datasource)
            .expect(&format!("{}: task datasource [\"{}\"] undefined.", task.0, task.1.datasource));
        let index = config
            .index(&task.1.index)
            .expect(&format!("{}: task index [\"{}\"] undefined.", task.0, task.1.index));
        let ids =
            datasource.inst().expect(&format!("{}: failed getting datasource instance", task.0));

        let mut offset: u32 = 0;
        let mut scroll_id: u64 = 0;
        loop {
            //get current timestamp
            let batch_sql_start_time = std::time::SystemTime::now()
                .duration_since(std::time::UNIX_EPOCH)
                .expect("Time went backwards");

            let batch_size = task.1.batch_size();
            //read all records to sync
            match ids.records(task.1, batch_size, offset, scroll_id) {
                Ok(records) => {
                    if records.len() == 0 {
                        break;
                    }
                    //get current timestamp
                    let batch_sql_end_time = std::time::SystemTime::now()
                        .duration_since(std::time::UNIX_EPOCH)
                        .expect("Time went backwards");

                    let new_records = records.iter().map(|r| task.1.mapping_value(r)).collect();
                    //push records to endpoint

                    'retry: loop {
                        match config.endpoint().inserts(&token, &new_records, index) {
                            Ok(_) => {
                                //get current timestamp
                                let push_end_time = std::time::SystemTime::now()
                                    .duration_since(std::time::UNIX_EPOCH)
                                    .expect("Time went backwards");
                                info!(
                                    "{} {} pushed to indexea [{},{}] in {}ms, records loaded in {}ms.",
                                    records.len(),
                                    task.0,
                                    index.app,
                                    index.index,
                                    push_end_time.as_millis() - batch_sql_end_time.as_millis(),
                                    batch_sql_end_time.as_millis() - batch_sql_start_time.as_millis()
                                );
                                break 'retry;
                            }
                            Err(Error::Status(code, resp)) => {
                                if let Ok(msg) = resp.into_json::<Message>() {
                                    error!(
                                        "{}: push records failed, code: {}, reason: {:?}",
                                        task.0, code, msg.message
                                    );
                                } else {
                                    error!("{}: push records failed, code: {}", task.0, code);
                                }
                                break 'retry;
                            }
                            Err(Error::Transport(t)) => {
                                if let Some(msg) = t.message() {
                                    error!(
                                        "{}: push records failed, reason: {}, waiting for retry...",
                                        task.0, msg
                                    );
                                }
                                //sleep and retry
                                thread::sleep(std::time::Duration::from_secs(5));
                            }
                        };
                    }

                    if records.len() < batch_size as usize {
                        break;
                    }
                    //get last element of records and read it id field
                    let primary_field = task.1.primary.as_ref().unwrap();
                    scroll_id = records.last().unwrap()[primary_field].as_u64().unwrap();

                    offset = offset + batch_size;
                }
                Err(e) => {
                    error!("{}: failed to list records, reason: {:?}", task.0, e);
                    break;
                }
            }
        }
    }
    Ok(())
}

/// check table `indexea_tasks` for increment data
/// # Arguments
/// * `config` global configurations from `indexer.yml`
///
fn start(config: &IndexerConfig, args: &CmdLineArgs) {
    let mut children = vec![];
    for task in config.tasks().iter() {
        if let Some(datasource) = config.datasource(&task.1.datasource) {
            //copy variables to thread
            let t_name: String = task.0.clone();
            let source = datasource.clone();
            let end_p = config.endpoint().clone();
            let this_task = task.1.clone();
            let verbose = args.is_verbose();
            let child = thread::spawn(move || this_task.start(&t_name, &source, &end_p, verbose));
            children.push(child);
        }
    }
    for child in children {
        // Wait for the thread to finish. Returns a result.
        let _ = child.join();
    }
}

/// operation confirm
fn confirm(msg: &str) -> bool {
    print!("{}", msg);
    std::io::stdout().flush().unwrap();
    let mut buffer = String::new();
    if let Ok(_) = stdin().read_line(&mut buffer) {
        return buffer.trim().eq("y");
    }
    false
}

/// kill process of indexer
fn kill_self() {
    let mut system = System::new_all();
    system.refresh_processes();
    let current_process_id = process::id();
    let ps = system
        .processes_by_exact_name("indexer")
        .filter(|p| p.pid().as_u32() != current_process_id);
    for p in ps {
        p.kill();
        println!("{}({}) exited.", p.exe().expect("indexer").display(), p.pid().as_u32());
    }
}
